1 /**
2 * Copyright 2014 Netflix, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package rx.internal.operators;
17
18 import java.util.Queue;
19 import java.util.concurrent.ConcurrentLinkedQueue;
20 import java.util.concurrent.atomic.*;
21
22 import rx.*;
23 import rx.Observable.Operator;
24 import rx.exceptions.*;
25 import rx.functions.Func1;
26 import rx.internal.util.*;
27
28 /**
29 * Flattens a list of {@link Observable}s into one {@code Observable}, without any transformation.
30 * <p>
31 * <img width="640" height="380" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/merge.png" alt="">
32 * <p>
33 * You can combine the items emitted by multiple {@code Observable}s so that they act like a single {@code Observable}, by using the merge operation.
34 * <p>
35 * The {@code instance(true)} call behaves like {@link OperatorMerge} except that if any of the merged Observables notify of
36 * an error via {@code onError}, {@code mergeDelayError} will refrain from propagating that error
37 * notification until all of the merged Observables have finished emitting items.
38 * <p>
39 * <img width="640" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/mergeDelayError.png" alt="">
40 * <p>
41 * Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will
42 * only invoke the {@code onError} method of its Observers once.
43 * <p>
44 * This operation allows an Observer to receive all successfully emitted items from all of the
45 * source Observables without being interrupted by an error notification from one of them.
46 * <p>
47 * <em>Note:</em> If this is used on an Observable that never completes, it will never call {@code onError} and will effectively swallow errors.
48
49 * @param <T>
50 * the type of the items emitted by both the source and merged {@code Observable}s
51 */
52 public class OperatorMerge<T> implements Operator<T, Observable<? extends T>> {
53 /** Lazy initialization via inner-class holder. */
54 private static final class HolderNoDelay {
55 /** A singleton instance. */
56 static final OperatorMerge<Object> INSTANCE = new OperatorMerge<Object>(false);
57 }
58 /** Lazy initialization via inner-class holder. */
59 private static final class HolderDelayErrors {
60 /** A singleton instance. */
61 static final OperatorMerge<Object> INSTANCE = new OperatorMerge<Object>(true);
62 }
63 /**
64 * @param delayErrors should the merge delay errors?
65 * @return a singleton instance of this stateless operator.
66 */
67 @SuppressWarnings("unchecked")
68 public static <T> OperatorMerge<T> instance(boolean delayErrors) {
69 if (delayErrors) {
70 return (OperatorMerge<T>)HolderDelayErrors.INSTANCE;
71 }
72 return (OperatorMerge<T>)HolderNoDelay.INSTANCE;
73 }
74 /*
75 * benjchristensen => This class is complex and I'm not a fan of it despite writing it. I want to give some background
76 * as to why for anyone who wants to try and help improve it.
77 *
78 * One of my first implementations that added backpressure support (Producer.request) was fairly elegant and used a simple
79 * queue draining approach. It was simple to understand as all onNext were added to their queues, then a single winner
80 * would drain the queues, similar to observeOn. It killed the Netflix API when I canaried it. There were two problems:
81 * (1) performance and (2) object allocation overhead causing massive GC pressure. Remember that merge is one of the most
82 * used operators (mostly due to flatmap) and is therefore critical to and a limiter of performance in any application.
83 *
84 * All subsequent work on this class and the various fast-paths and branches within it have been to achieve the needed functionality
85 * while reducing or eliminating object allocation and keeping performance acceptable.
86 *
87 * This has meant adopting strategies such as:
88 *
89 * - ring buffers instead of growable queues
90 * - object pooling
91 * - skipping request logic when downstream does not need backpressure
92 * - ScalarValueQueue for optimizing synchronous single-value Observables
93 * - adopting data structures that use Unsafe (and gating them based on environment so non-Oracle JVMs still work)
94 *
95 * It has definitely increased the complexity and maintenance cost of this class, but the performance gains have been significant.
96 *
97 * The biggest cost of the increased complexity is concurrency bugs and reasoning through what's going on.
98 *
99 * I'd love to have contributions that improve this class, but keep in mind the performance and GC pressure.
100 * The benchmarks I use are in the JMH OperatorMergePerf class. GC memory pressure is tested using Java Flight Recorder
101 * to track object allocation.
102 */
103
104 private OperatorMerge() {
105 this.delayErrors = false;
106 }
107
108 private OperatorMerge(boolean delayErrors) {
109 this.delayErrors = delayErrors;
110 }
111
112 private final boolean delayErrors;
113
114 @Override
115 public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
116 return new MergeSubscriber<T>(child, delayErrors);
117
118 }
119
120 private static final class MergeSubscriber<T> extends Subscriber<Observable<? extends T>> {
121 final NotificationLite<T> on = NotificationLite.instance();
122 final Subscriber<? super T> actual;
123 private final MergeProducer<T> mergeProducer;
124 private int wip;
125 private boolean completed;
126 private final boolean delayErrors;
127 private ConcurrentLinkedQueue<Throwable> exceptions;
128
129 private volatile SubscriptionIndexedRingBuffer<InnerSubscriber<T>> childrenSubscribers;
130
131 private volatile RxRingBuffer scalarValueQueue = null;
132
133 /* protected by lock on MergeSubscriber instance */
134 private int missedEmitting = 0;
135 private boolean emitLock = false;
136
137 /**
138 * Using synchronized(this) for `emitLock` instead of ReentrantLock or AtomicInteger is faster when there is no contention.
139 *
140 * <pre> {@code
141 * Using ReentrantLock:
142 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 44185.294 1295.565 ops/s
143 *
144 * Using synchronized(this):
145 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 79715.981 3704.486 ops/s
146 *
147 * Still slower though than allowing concurrency:
148 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 149331.046 4851.290 ops/s
149 * } </pre>
150 */
151
152 public MergeSubscriber(Subscriber<? super T> actual, boolean delayErrors) {
153 super(actual);
154 this.actual = actual;
155 this.mergeProducer = new MergeProducer<T>(this);
156 this.delayErrors = delayErrors;
157 // decoupled the subscription chain because we need to decouple and control backpressure
158 actual.add(this);
159 actual.setProducer(mergeProducer);
160 }
161
162 @Override
163 public void onStart() {
164 // we request backpressure so we can handle long-running Observables that are enqueueing, such as flatMap use cases
165 // we decouple the Producer chain while keeping the Subscription chain together (perf benefit) via super(actual)
166 request(RxRingBuffer.SIZE);
167 }
168
169 /*
170 * This is expected to be executed sequentially as per the Rx contract or it will not work.
171 */
172 @Override
173 public void onNext(Observable<? extends T> t) {
174 if (t instanceof ScalarSynchronousObservable) {
175 ScalarSynchronousObservable<? extends T> t2 = (ScalarSynchronousObservable<? extends T>)t;
176 handleScalarSynchronousObservable(t2);
177 } else {
178 if (t == null || isUnsubscribed()) {
179 return;
180 }
181 synchronized (this) {
182 // synchronized here because `wip` can be concurrently changed by children Observables
183 wip++;
184 }
185 handleNewSource(t);
186 }
187 }
188
189 private void handleNewSource(Observable<? extends T> t) {
190 if (childrenSubscribers == null) {
191 // lazily create this only if we receive Observables we need to subscribe to
192 childrenSubscribers = new SubscriptionIndexedRingBuffer<InnerSubscriber<T>>();
193 add(childrenSubscribers);
194 }
195 MergeProducer<T> producerIfNeeded = null;
196 // if we have received a request then we need to respect it, otherwise we fast-path
197 if (mergeProducer.requested != Long.MAX_VALUE) {
198 /**
199 * <pre> {@code
200 * With this optimization:
201 *
202 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 57100.080 4686.331 ops/s
203 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 60.875 1.622 ops/s
204 *
205 * Without this optimization:
206 *
207 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 29863.945 1858.002 ops/s
208 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 30.516 1.087 ops/s
209 * } </pre>
210 */
211 producerIfNeeded = mergeProducer;
212 }
213 InnerSubscriber<T> i = new InnerSubscriber<T>(this, producerIfNeeded);
214 i.sindex = childrenSubscribers.add(i);
215 t.unsafeSubscribe(i);
216 if (!isUnsubscribed()) {
217 request(1);
218 }
219 }
220
221 private void handleScalarSynchronousObservable(ScalarSynchronousObservable<? extends T> t) {
222 // fast-path for scalar, synchronous values such as Observable.from(int)
223 /**
224 * Without this optimization:
225 *
226 * <pre> {@code
227 * Benchmark (size) Mode Samples Score Score error Units
228 * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1 thrpt 5 2,418,452.409 130572.665 ops/s
229 * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000 thrpt 5 5,690.456 94.958 ops/s
230 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 takes too long
231 *
232 * With this optimization:
233 *
234 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 5,475,300.198 156741.334 ops/s
235 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 68,932.278 1311.023 ops/s
236 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 64.405 0.611 ops/s
237 * } </pre>
238 *
239 */
240 if (mergeProducer.requested == Long.MAX_VALUE) {
241 handleScalarSynchronousObservableWithoutRequestLimits(t);
242 } else {
243 handleScalarSynchronousObservableWithRequestLimits(t);
244 }
245 }
246
247 private void handleScalarSynchronousObservableWithoutRequestLimits(ScalarSynchronousObservable<? extends T> t) {
248 T value = t.get();
249 if (getEmitLock()) {
250 boolean moreToDrain;
251 try {
252 actual.onNext(value);
253 } finally {
254 moreToDrain = releaseEmitLock();
255 }
256 if (moreToDrain) {
257 drainQueuesIfNeeded();
258 }
259 request(1);
260 return;
261 } else {
262 try {
263 getOrCreateScalarValueQueue().onNext(value);
264 } catch (MissingBackpressureException e) {
265 onError(e);
266 }
267 return;
268 }
269 }
270
271 private void handleScalarSynchronousObservableWithRequestLimits(ScalarSynchronousObservable<? extends T> t) {
272 if (getEmitLock()) {
273 boolean emitted = false;
274 boolean moreToDrain;
275 boolean isReturn = false;
276 try {
277 long r = mergeProducer.requested;
278 if (r > 0) {
279 emitted = true;
280 actual.onNext(t.get());
281 MergeProducer.REQUESTED.decrementAndGet(mergeProducer);
282 // we handle this Observable without ever incrementing the wip or touching other machinery so just return here
283 isReturn = true;
284 }
285 } finally {
286 moreToDrain = releaseEmitLock();
287 }
288 if (moreToDrain) {
289 drainQueuesIfNeeded();
290 }
291 if (emitted) {
292 request(1);
293 }
294 if (isReturn) {
295 return;
296 }
297 }
298
299 // if we didn't return above we need to enqueue
300 // enqueue the values for later delivery
301 try {
302 getOrCreateScalarValueQueue().onNext(t.get());
303 } catch (MissingBackpressureException e) {
304 onError(e);
305 }
306 }
307
308 private RxRingBuffer getOrCreateScalarValueQueue() {
309 RxRingBuffer svq = scalarValueQueue;
310 if (svq == null) {
311 svq = RxRingBuffer.getSpscInstance();
312 scalarValueQueue = svq;
313 }
314 return svq;
315 }
316
317 private synchronized boolean releaseEmitLock() {
318 emitLock = false;
319 if (missedEmitting == 0) {
320 return false;
321 } else {
322 return true;
323 }
324 }
325
326 private synchronized boolean getEmitLock() {
327 if (emitLock) {
328 missedEmitting++;
329 return false;
330 } else {
331 emitLock = true;
332 missedEmitting = 0;
333 return true;
334 }
335 }
336
337 private boolean drainQueuesIfNeeded() {
338 while (true) {
339 if (getEmitLock()) {
340 int emitted = 0;
341 boolean moreToDrain;
342 try {
343 emitted = drainScalarValueQueue();
344 drainChildrenQueues();
345 } finally {
346 moreToDrain = releaseEmitLock();
347 }
348 // request outside of lock
349 if (emitted > 0) {
350 request(emitted);
351 }
352 if (!moreToDrain) {
353 return true;
354 }
355 // otherwise we'll loop and get whatever was added
356 } else {
357 return false;
358 }
359 }
360 }
361
362 int lastDrainedIndex = 0;
363
364 /**
365 * ONLY call when holding the EmitLock.
366 */
367 private void drainChildrenQueues() {
368 if (childrenSubscribers != null) {
369 lastDrainedIndex = childrenSubscribers.forEach(DRAIN_ACTION, lastDrainedIndex);
370 }
371 }
372
373 /**
374 * ONLY call when holding the EmitLock.
375 */
376 private int drainScalarValueQueue() {
377 RxRingBuffer svq = scalarValueQueue;
378 if (svq != null) {
379 long r = mergeProducer.requested;
380 int emittedWhileDraining = 0;
381 if (r < 0) {
382 // drain it all
383 Object o = null;
384 while ((o = svq.poll()) != null) {
385 on.accept(actual, o);
386 emittedWhileDraining++;
387 }
388 } else if (r > 0) {
389 // drain what was requested
390 long toEmit = r;
391 for (int i = 0; i < toEmit; i++) {
392 Object o = svq.poll();
393 if (o == null) {
394 break;
395 } else {
396 on.accept(actual, o);
397 emittedWhileDraining++;
398 }
399 }
400 // decrement the number we emitted from outstanding requests
401 MergeProducer.REQUESTED.getAndAdd(mergeProducer, -emittedWhileDraining);
402 }
403 return emittedWhileDraining;
404 }
405 return 0;
406 }
407
408 final Func1<InnerSubscriber<T>, Boolean> DRAIN_ACTION = new Func1<InnerSubscriber<T>, Boolean>() {
409
410 @Override
411 public Boolean call(InnerSubscriber<T> s) {
412 if (s.q != null) {
413 long r = mergeProducer.requested;
414 int emitted = s.drainQueue();
415 if (emitted > 0) {
416 s.requestMore(emitted);
417 }
418 if (emitted == r) {
419 // we emitted as many as were requested so stop the forEach loop
420 return Boolean.FALSE;
421 }
422 }
423 return Boolean.TRUE;
424 }
425
426 };
427
428 @Override
429 public void onError(Throwable e) {
430 if (!completed) {
431 completed = true;
432 innerError(e, true);
433 }
434 }
435
436 private void innerError(Throwable e, boolean parent) {
437 if (delayErrors) {
438 synchronized (this) {
439 if (exceptions == null) {
440 exceptions = new ConcurrentLinkedQueue<Throwable>();
441 }
442 }
443 exceptions.add(e);
444 boolean sendOnComplete = false;
445 synchronized (this) {
446 if (!parent) {
447 wip--;
448 }
449 if ((wip == 0 && completed) || (wip < 0)) {
450 sendOnComplete = true;
451 }
452 }
453 if (sendOnComplete) {
454 drainAndComplete();
455 }
456 } else {
457 actual.onError(e);
458 }
459 }
460
461 @Override
462 public void onCompleted() {
463 boolean c = false;
464 synchronized (this) {
465 completed = true;
466 if (wip == 0) {
467 c = true;
468 }
469 }
470 if (c) {
471 // complete outside of lock
472 drainAndComplete();
473 }
474 }
475
476 void completeInner(InnerSubscriber<T> s) {
477 boolean sendOnComplete = false;
478 synchronized (this) {
479 wip--;
480 if (wip == 0 && completed) {
481 sendOnComplete = true;
482 }
483 }
484 childrenSubscribers.remove(s.sindex);
485 if (sendOnComplete) {
486 drainAndComplete();
487 }
488 }
489
490 private void drainAndComplete() {
491 boolean moreToDrain = true;
492 while (moreToDrain) {
493 synchronized (this) {
494 missedEmitting = 0;
495 }
496 drainScalarValueQueue();
497 drainChildrenQueues();
498 synchronized (this) {
499 moreToDrain = missedEmitting > 0;
500 }
501 }
502 RxRingBuffer svq = scalarValueQueue;
503 if (svq == null || svq.isEmpty()) {
504 if (delayErrors) {
505 Queue<Throwable> es = null;
506 synchronized (this) {
507 es = exceptions;
508 }
509 if (es != null) {
510 if (es.isEmpty()) {
511 actual.onCompleted();
512 } else if (es.size() == 1) {
513 actual.onError(es.poll());
514 } else {
515 actual.onError(new CompositeException(es));
516 }
517 } else {
518 actual.onCompleted();
519 }
520 } else {
521 actual.onCompleted();
522 }
523 }
524 }
525
526 }
527
528 private static final class MergeProducer<T> implements Producer {
529
530 private final MergeSubscriber<T> ms;
531
532 public MergeProducer(MergeSubscriber<T> ms) {
533 this.ms = ms;
534 }
535
536 private volatile long requested = 0;
537 @SuppressWarnings("rawtypes")
538 static final AtomicLongFieldUpdater<MergeProducer> REQUESTED = AtomicLongFieldUpdater.newUpdater(MergeProducer.class, "requested");
539
540 @Override
541 public void request(long n) {
542 if (requested == Long.MAX_VALUE) {
543 return;
544 }
545 if (n == Long.MAX_VALUE) {
546 requested = Long.MAX_VALUE;
547 } else {
548 BackpressureUtils.getAndAddRequest(REQUESTED, this, n);
549 if (ms.drainQueuesIfNeeded()) {
550 boolean sendComplete = false;
551 synchronized (ms) {
552 if (ms.wip == 0 && ms.scalarValueQueue != null && ms.scalarValueQueue.isEmpty()) {
553 sendComplete = true;
554 }
555 }
556 if (sendComplete) {
557 ms.drainAndComplete();
558 }
559 }
560 }
561 }
562
563 }
564
565 private static final class InnerSubscriber<T> extends Subscriber<T> {
566 public int sindex;
567 final MergeSubscriber<T> parentSubscriber;
568 final MergeProducer<T> producer;
569 /** Make sure the inner termination events are delivered only once. */
570 @SuppressWarnings("unused")
571 volatile int terminated;
572 @SuppressWarnings("rawtypes")
573 static final AtomicIntegerFieldUpdater<InnerSubscriber> ONCE_TERMINATED = AtomicIntegerFieldUpdater.newUpdater(InnerSubscriber.class, "terminated");
574
575 private final RxRingBuffer q = RxRingBuffer.getSpscInstance();
576
577 public InnerSubscriber(MergeSubscriber<T> parent, MergeProducer<T> producer) {
578 this.parentSubscriber = parent;
579 this.producer = producer;
580 add(q);
581 request(q.capacity());
582 }
583
584 @Override
585 public void onNext(T t) {
586 emit(t, false);
587 }
588
589 @Override
590 public void onError(Throwable e) {
591 // it doesn't go through queues, it immediately onErrors and tears everything down
592 if (ONCE_TERMINATED.compareAndSet(this, 0, 1)) {
593 parentSubscriber.innerError(e, false);
594 }
595 }
596
597 @Override
598 public void onCompleted() {
599 if (ONCE_TERMINATED.compareAndSet(this, 0, 1)) {
600 emit(null, true);
601 }
602 }
603
604 public void requestMore(long n) {
605 request(n);
606 }
607
608 private void emit(T t, boolean complete) {
609 boolean drain = false;
610 boolean enqueue = true;
611 /**
612 * This optimization to skip the queue is messy ... but it makes a big difference in performance when merging a single stream
613 * with many values, or many intermittent streams without contention. It doesn't make much of a difference if there is contention.
614 *
615 * Below are some of the relevant benchmarks to show the difference.
616 *
617 * <pre> {@code
618 * With this fast-path:
619 *
620 * Benchmark (size) Mode Samples Score Score error Units
621 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 5344143.680 393484.592 ops/s
622 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 83582.662 4293.755 ops/s +++
623 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 73.889 4.477 ops/s +++
624 *
625 * r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1 thrpt 5 5799265.333 199205.296 ops/s +
626 * r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1000 thrpt 5 62.655 2.521 ops/s +++
627 *
628 * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1 thrpt 5 76925.616 4909.174 ops/s
629 * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1000 thrpt 5 3634.977 242.469 ops/s
630 *
631 * Without:
632 *
633 * Benchmark (size) Mode Samples Score Score error Units
634 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 5099295.678 159539.842 ops/s
635 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 18196.671 10053.298 ops/s
636 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 19.184 1.028 ops/s
637 *
638 * r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1 thrpt 5 5591612.719 591821.763 ops/s
639 * r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1000 thrpt 5 21.018 3.251 ops/s
640 *
641 * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1 thrpt 5 72692.073 18395.031 ops/s
642 * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1000 thrpt 5 4379.093 386.368 ops/s
643 * } </pre>
644 *
645 * It looks like it may cause a slowdown in highly contended cases (like 'mergeTwoAsyncStreamsOfN' above) as instead of just
646 * putting in the queue, it attempts to get the lock. We are optimizing for the non-contended case.
647 */
648 if (parentSubscriber.getEmitLock()) {
649 long emitted = 0;
650 enqueue = false;
651 try {
652 // drain the queue if there is anything in it before emitting the current value
653 emitted += drainQueue();
654 // }
655 if (producer == null) {
656 // no backpressure requested
657 if (complete) {
658 parentSubscriber.completeInner(this);
659 } else {
660 try {
661 parentSubscriber.actual.onNext(t);
662 } catch (Throwable e) {
663 // special error handling due to complexity of merge
664 onError(OnErrorThrowable.addValueAsLastCause(e, t));
665 }
666 emitted++;
667 }
668 } else {
669 // this needs to check q.count() as draining above may not have drained the full queue
670 // perf tests show this to be okay, though different queue implementations could perform poorly with this
671 if (producer.requested > 0 && q.count() == 0) {
672 if (complete) {
673 parentSubscriber.completeInner(this);
674 } else {
675 try {
676 parentSubscriber.actual.onNext(t);
677 } catch (Throwable e) {
678 // special error handling due to complexity of merge
679 onError(OnErrorThrowable.addValueAsLastCause(e, t));
680 }
681 emitted++;
682 MergeProducer.REQUESTED.decrementAndGet(producer);
683 }
684 } else {
685 // no requests available, so enqueue it
686 enqueue = true;
687 }
688 }
689 } finally {
690 drain = parentSubscriber.releaseEmitLock();
691 }
692 // request upstream what we just emitted
693 if(emitted > 0) {
694 request(emitted);
695 }
696 }
697 if (enqueue) {
698 enqueue(t, complete);
699 drain = true;
700 }
701 if (drain) {
702 /**
703 * This extra check for whether to call drain is ugly, but it helps:
704 * <pre> {@code
705 * Without:
706 * r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1000 thrpt 5 61.812 1.455 ops/s
707 *
708 * With:
709 * r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1000 thrpt 5 78.795 1.766 ops/s
710 * } </pre>
711 */
712 parentSubscriber.drainQueuesIfNeeded();
713 }
714 }
715
716 private void enqueue(T t, boolean complete) {
717 try {
718 if (complete) {
719 q.onCompleted();
720 } else {
721 q.onNext(t);
722 }
723 } catch (MissingBackpressureException e) {
724 onError(e);
725 }
726 }
727
728 private int drainRequested() {
729 int emitted = 0;
730 // drain what was requested
731 long toEmit = producer.requested;
732 Object o;
733 for (int i = 0; i < toEmit; i++) {
734 o = q.poll();
735 if (o == null) {
736 // no more items
737 break;
738 } else if (q.isCompleted(o)) {
739 parentSubscriber.completeInner(this);
740 } else {
741 try {
742 if (!q.accept(o, parentSubscriber.actual)) {
743 emitted++;
744 }
745 } catch (Throwable e) {
746 // special error handling due to complexity of merge
747 onError(OnErrorThrowable.addValueAsLastCause(e, o));
748 }
749 }
750 }
751
752 // decrement the number we emitted from outstanding requests
753 MergeProducer.REQUESTED.getAndAdd(producer, -emitted);
754 return emitted;
755 }
756
757 private int drainAll() {
758 int emitted = 0;
759 // drain it all
760 Object o;
761 while ((o = q.poll()) != null) {
762 if (q.isCompleted(o)) {
763 parentSubscriber.completeInner(this);
764 } else {
765 try {
766 if (!q.accept(o, parentSubscriber.actual)) {
767 emitted++;
768 }
769 } catch (Throwable e) {
770 // special error handling due to complexity of merge
771 onError(OnErrorThrowable.addValueAsLastCause(e, o));
772 }
773 }
774 }
775 return emitted;
776 }
777
778 private int drainQueue() {
779 if (producer != null) {
780 return drainRequested();
781 } else {
782 return drainAll();
783 }
784 }
785 }
786 }